package com.destroyer.common.tool;


import com.destroyer.common.thread.MyThreadFactory;
import com.destroyer.common.util.Func;
import lombok.Getter;

import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;

/**
 * @Description 生产消费者
 * @Author chenp
 * @Date 2022/3/2 15:24
 **/

public class ProducerAndConsumer<T> {
    @Getter
    protected final Factory<T> factory;

    /**
     * 全部消费
     *
     * @param max
     */
    private ProducerAndConsumer(int max,
                                ISignalConsumer<T> signalConsumer,
                                IAllConsumer<T> allConsumer,
                                int waitTime,
                                TimeUnit timeUnit,
                                String... name) {
        this.factory = new Factory<>(max, signalConsumer, allConsumer);
        if (waitTime != 0) {
            ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new MyThreadFactory(Func.isNotEmpty(name) ? name[0] : "PACScheduledExecutor"));
            scheduledExecutorService.scheduleWithFixedDelay(this.factory::remove, waitTime, waitTime, timeUnit);
        } else {
            ExecutorService executorService = Executors.newSingleThreadExecutor(new MyThreadFactory(Func.isNotEmpty(name) ? name[0] : "ProducerAndConsumer"));
            executorService.submit(new ConsumerThread<>(this));
        }
    }

    /**
     * 全部消费模式
     *
     * @param allConsumer 接口
     * @param name        名
     * @return
     */
    public static <T> ProducerAndConsumer<T> builder(IAllConsumer<T> allConsumer, String... name) {
        return new ProducerAndConsumer<>(1000, null, allConsumer, 0, null, name);
    }

    /**
     * 全部并等待消费模式
     *
     * @param allConsumer 接口
     * @param name        名
     * @return
     */
    public static <T> ProducerAndConsumer<T> builder(IAllConsumer<T> allConsumer, int waitTime, TimeUnit timeUnit, String... name) {
        return new ProducerAndConsumer<>(1000, null, allConsumer, waitTime, timeUnit, name);
    }

    /**
     * 单个消费模式。
     *
     * @param signalConsumer 接口
     * @param name           名
     * @return
     */
    public static <T> ProducerAndConsumer<T> builder(ISignalConsumer<T> signalConsumer, String... name) {
        return new ProducerAndConsumer<>(1000, signalConsumer, null, 0, null, name);
    }


    /**
     * 生产方法。
     *
     * @param t
     */
    public void produce(T t) {
        this.factory.produce(t);

    }

    /**
     * 生产LIST；
     *
     * @param list
     */
    public void produceList(List<T> list) {
        this.factory.produceList(list);

    }

    public <R> void consumeAll(Function<T, R> function, R r) {
        this.factory.consumeAll(function, r);
    }

    /**
     * 一个一个的消息接口
     *
     * @param <T>
     */
    public interface ISignalConsumer<T> {
        void consume(T t);
    }

    /**
     * 全部消费的接口
     *
     * @param <T>
     */
    public interface IAllConsumer<T> {
        void consume(List<T> t);
    }

    static class Factory<T> {
        final ConcurrentHashMap<Long, T> dataMap = new ConcurrentHashMap();
        int max;//工厂的最大数
        int num;//当前工厂当中还有多少
        Long mapIndex = 0L;//数据
        Long consumeIndex;//消费的数量
        ISignalConsumer<T> consumer;
        IAllConsumer<T> allConsumer;

        public Factory(int max, ISignalConsumer<T> consumer, IAllConsumer<T> allConsumer) {
            this.max = max;
            this.consumer = consumer;
            this.allConsumer = allConsumer;
        }

        void produceList(List<T> list) {
            if (num + list.size() < max) {
                list.stream().forEach(m -> {
                    dataMap.put(mapIndex, m);
                    mapIndex++;
                    num++;
                });
                synchronized (dataMap) {
                    dataMap.notify();
                }
            } else {
                try {
                    synchronized (dataMap) {
                        this.dataMap.wait();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        /**
         * 生产
         */
        void produce(T t) {
            //如果工厂没有满，就生产物品
            if (num < max) {
                num++;
                dataMap.put(++mapIndex, t);
                synchronized (dataMap) {
                    dataMap.notify();
                }
            } else {
                try {
                    synchronized (dataMap) {
                        //工厂满了，生产者等待
                        dataMap.wait();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        void remove() {
            //如果工厂有物品
            consumeIndex = mapIndex;
            if (num > 0) {
                List<T> dataList = new ArrayList<>();
                if (allConsumer != null) {
                    for (Map.Entry<Long, T> map : dataMap.entrySet()) {
                        if (map.getKey() <= mapIndex) {
                            dataList.add(map.getValue());
                        }
                    }
                    allConsumer.consume(dataList);
                    Iterator<Long> iterator = dataMap.keySet().iterator();
                    while (iterator.hasNext()) {
                        Long key = iterator.next();
                        if (key <= consumeIndex) {
                            iterator.remove();
                            dataMap.remove(key);
                        }
                    }
                    num = dataMap.size();
                } else if (consumer != null) {
                    if (dataMap.size() > 0) {
                        Long idx = dataMap.keySet().stream().max(Comparator.comparing(Long::longValue)).get();
                        consumer.consume(dataMap.get(idx));
                        synchronized (dataMap) {
                            dataMap.remove(idx);
                        }
                        num--;
                    }
                } else {
                    System.err.println("未重写消费信息，将直接扔掉");
                    dataMap.clear();
                    num = 0;
                }
            } else {
                try {
                    synchronized (dataMap) {
                        //没有东西可以被消费了，该线程等待，等到生产者来生产
                        dataMap.wait();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        /**
         * 消费指定物品。
         *
         * @param function
         * @param r
         * @param <R>
         */
        public <R> void consumeAll(Function<T, R> function, R r) {
            if (num > 0) {
                synchronized (dataMap) {
                    dataMap.values().removeIf(l -> {
                        if (function.apply(l) != null && function.apply(l).equals(r)) {
                            num--;
                            return true;
                        }
                        return false;
                    });
                    //唤醒等待的生产者来生产物品
                    dataMap.notify();
                }
            }
        }

    }

    /**
     * 消费者。
     *
     * @param <T>
     */
    static class ConsumerThread<T> extends Thread {
        ProducerAndConsumer<T> factory;

        public ConsumerThread(ProducerAndConsumer<T> producerAndConsumer) {
            this.factory = producerAndConsumer;
        }

        @Override
        public void run() {
            while (factory != null) {
                //一直消费
                try {
                    factory.getFactory().remove();
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
        }
    }


}






